SESのログをイベント種別毎に分類して保存するFirehoseをCloudFormationで作成してみた

SESのログをイベント種別毎に分類して保存するFirehoseをCloudFormationで作成してみた

AWSブログで紹介されている、AWSサービスを利用したSESのログ解析。 Firehoseの動的パーティションを追加する事で、より簡単な設定、効率の良い解析を実現してみました。
Clock Icon2024.03.10

Amazon SES のイベントデータを AWSサービスの Firehose、S3、Athena、QuickSightを利用して解析する方法がAWSブログで紹介されています。

今回、AWSブログで紹介されている CloudFormationテンプレートを改造、 Firehose の 動的パーティション機能を利用し、、ドメイン(ID)、イベントタイプ別に分類されたS3保存を実現。

Amazon SESの 一つの設定セットのイベント送信設定を実施する事で、簡単にAthenaでログ解析を可能とする利用を試す機会がありましたので、紹介させていただきます。

構成図

オリジナル

AWSブログの構成図 Analyzing Amazon SES event data with AWS Analytics Services

更新版

改正版構成図

Firehose

  • Firehoseに動的パーティション設定を追加しました。
  • SESのJSON形式のイベントログを、JQを利用してパース。「SenderIdentity」(SES登録のドメイン)と、「EventType」(SESのイベント種別)を動的パーティション対象としました。
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      ExtendedS3DestinationConfiguration:
        Prefix: "partitioned/!{partitionKeyFromQuery:SenderIdentity}/!{partitionKeyFromQuery:EventType}/!{timestamp:yyyy/MM/dd}/"
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{eventType:.eventType}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300

S3

  • ログ出力先のS3バケットは、ストレージ費用節約のため 40日で自動削除する ライフサイクル管理を追加しました。
  s3BucketSESEvents:
    Type: AWS::S3::Bucket
    Properties:
      LifecycleConfiguration:
        Rules:
          - Id: !Sub 'Delete-After-40-Days'
            ExpirationInDays: 40
            Status: Enabled

設定手順

CloudFormation

修正を反映したFirehose設置用のテンプレートをダウンロードし、任意のスタック名でデプロイします。

‐ デプロイ完了後の設置リソース

Cfnリソース

SES

Firehoseへのイベント記録を有効とした SESの設定セットを用意します。

イベントタイプ

今回、イベントタイプは「すべて」を選択

イベントタイプの選択

送信先

  • 送信先タイプは「Firehose」を選択
  • 配信ストリームは、CloudFormationで作成したFirehoseのストリームを選択
  • IAMロールは、、CloudFormationで作成した、「<スタック名>-ConfigSetPermissionPutFirehose-*」 を選択。 (SESから Firehoseへの記録を許可します)

送信先の指定

作成した設定セットは、デフォルトとして SESの設定済みドメイン(ID)に登録しました。

デフォルト設定セット

動作確認

SES で送信失敗、バウンスメールとして扱われたログの確認を試みました。

バウンスメールの再現

バウンスメールを発生させるため、SESのシミュレーターとして用意されているメールアドレスを利用しました。

バウンス – 受取人のEメールプロバイダーは、SMTP 550 5.1.1 レスポンスコード (「不明なユーザー」) レスポンスコードで E メールを拒否します

[email protected]

シミュレーターを使用した Amazon SES でのテストメール送信

[email protected]」宛にテストメールの送信を試み、バウンスイベントを発生させました。

テスト_E_メールの送信

S3

s3://aws-s3-ses-analytics-<アカウント>-<リージョン>/partitioned/<ドメイン>/ 以下の「Bounce」以下、日付を含むパスで バウンスイベントのログが記録されました。

S3オブジェクト確認

S3 Selectを利用して、バウンスイベントの詳細を確認出来ました。

S3Selectバウンスメール確認

Athena

パーティションなし

「/Bounce/」までのS3のパスを指定、Athenaでテーブルを定義しました。

CREATE EXTERNAL TABLE sesmaster_bounce (
eventType string,
bounce struct < bouncedrecipients: array < struct < action: string,
diagnosticcode: string,
emailaddress: string,
status: string >>,
bouncesubtype: string,
bouncetype: string,
feedbackid: string,
reportingmta: string,
`timestamp`: string >,
mail struct < timestamp: string,
source: string,
sourcearn: string,
sendingaccountid: string,
messageid: string,
destination: string,
headerstruncated: boolean,
headers: array < struct < name: string,
value: string >>,
commonheaders: struct < `from`: array < string >,
to: array < string >,
messageid: string,
subject: string >,
tags: struct < ses_source_tls_version: string,
ses_operation: string,
ses_configurationset: string,
ses_source_ip: string,
ses_outgoing_ip: string,
ses_from_domain: string,
ses_caller_identity: string >>
)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
"mapping.ses_caller_identity" = "ses:caller-identity",
"mapping.ses_configurationset" = "ses:configuration-set",
"mapping.ses_from_domain" = "ses:from-domain",
"mapping.ses_operation" = "ses:opeation",
"mapping.ses_outgoing_ip" = "ses:outgoing-ip",
"mapping.ses_source_ip" = "ses:source-ip",
"mapping.ses_source_tls_version" = "ses:source-tls-version"
)
LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/partitioned/<Identity>/Bounce/'
  • 実行例

Athenaバウンスメール確認

  • SQL
SELECT * FROM "default"."sesmaster_bounce" ;

パーティション投影有効

  • パーティション投影を利用、ドメイン、イベントタイプ、日付で対象を絞り込む利用を試しました。
CREATE EXTERNAL TABLE sesmaster_partition (
eventType string,
complaint struct < arrivaldate: string,
complainedrecipients: array < struct < emailaddress: string >>,
complaintfeedbacktype: string,
feedbackid: string,
`timestamp`: string,
useragent: string >,
bounce struct < bouncedrecipients: array < struct < action: string,
diagnosticcode: string,
emailaddress: string,
status: string >>,
bouncesubtype: string,
bouncetype: string,
feedbackid: string,
reportingmta: string,
`timestamp`: string >,
mail struct < timestamp: string,
source: string,
sourcearn: string,
sendingaccountid: string,
messageid: string,
destination: string,
headerstruncated: boolean,
headers: array < struct < name: string,
value: string >>,
commonheaders: struct < `from`: array < string >,
to: array < string >,
messageid: string,
subject: string >,
tags: struct < ses_source_tls_version: string,
ses_operation: string,
ses_configurationset: string,
ses_source_ip: string,
ses_outgoing_ip: string,
ses_from_domain: string,
ses_caller_identity: string >>,
send string,
delivery struct < processingtimemillis: int,
recipients: array < string >,
reportingmta: string,
smtpresponse: string,
`timestamp`: string >,
open struct < ipaddress: string,
`timestamp`: string,
userAgent: string >,
reject struct < reason: string >,
click struct < ipAddress: string,
`timestamp`: string,
userAgent: string,
link: string >
)
PARTITIONED BY (sender_identity string, event_type string, day string)
ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe'
WITH SERDEPROPERTIES (
"mapping.ses_caller_identity" = "ses:caller-identity",
"mapping.ses_configurationset" = "ses:configuration-set",
"mapping.ses_from_domain" = "ses:from-domain",
"mapping.ses_operation" = "ses:opeation",
"mapping.ses_outgoing_ip" = "ses:outgoing-ip",
"mapping.ses_source_ip" = "ses:source-ip",
"mapping.ses_source_tls_version" = "ses:source-tls-version"
)
LOCATION 's3://aws-s3-ses-analytics-<aws-account-number>/partitioned/'
TBLPROPERTIES (
 "projection.enabled" = "true",
 "projection.sender_identity.type" = "injected",
 "projection.event_type.type" = "enum",
 "projection.event_type.values" = "Bounce,Complaint,Delivery,Send,Reject,Open,Click,`Rendering Failure`,DeliveryDelay,Subscription",
 "projection.day.type" = "date",
 "projection.day.format" = "yyyy/MM/dd",
 'projection.day.range' = 'NOW-40DAYS,NOW',
 "projection.day.interval" = "1",
 "projection.day.interval.unit" = "DAYS",
 "storage.location.template" = "s3://aws-s3-ses-analytics-<aws-account-number>/partitioned/${sender_identity}/${event_type}/${day}/"
)   

実行例

  • 指定した年月日、ドメインで発生したバウンスメールの参照を試みました

Athenaバウンスメール確認パーティション版

  • SQL
SELECT * FROM sesmaster_partition
where  sender_identity ='<ドメイン>'
and event_type = 'Bounce'
and day ='2024/03/08'
  • 当日、前日のBounceメールを対象とし、バウンス内容を展開を試みました

バウンス内容を展開

SELECT
  c.*,
  b.action as bounceaction,
  b.diagnosticcode as bouncediagnosticcode,
  b.emailaddress as bounceemailaddress
from
  (
    select
      messageid,
      recipient.action,
      recipient.diagnosticcode,
      recipient.emailaddress
    FROM
      (
        SELECT
          mail.messageId as messageid,
          bounce.bouncedrecipients as bouncedrecipients
        FROM
          "default"."sesmaster_partition"
        where
          sender_identity = '<ドメイン>'
          and day in (
            date_format(current_timestamp, '%Y/%m/%d'),
            date_format(current_timestamp - interval '1' day, '%Y/%m/%d')
          )
          and EventType = 'Bounce'
      ) a
      CROSS JOIN UNNEST(bouncedrecipients) as t(recipient)
  ) b,
  (
    SELECT
      eventtype as eventtype,
      mail.messageId as mailmessageid,
      mail.timestamp as mailtimestamp,
      mail.source as mailsource,
      mail.sendingAccountId as mailsendingAccountId,
      mail.commonHeaders.subject as mailsubject,
      mail.tags.ses_configurationset as mailses_configurationset,
      mail.tags.ses_source_ip as mailses_source_ip,
      mail.tags.ses_from_domain as mailses_from_domain,
      mail.tags.ses_outgoing_ip as mailses_outgoing_ip,
      bounce.bounceType as bouncebounceType,
      bounce.bouncesubtype as bouncebouncesubtype,
      bounce.feedbackid as bouncefeedbackid,
      bounce.timestamp as bouncetimestamp,
      bounce.reportingMTA as bouncereportingmta
    FROM
      "default"."sesmaster_partition"
    where
      sender_identity = '<ドメイン>'
      and day in (
        date_format(current_timestamp, '%Y/%m/%d'),
        date_format(current_timestamp - interval '1' day, '%Y/%m/%d')
      )
      and EventType = 'Bounce'
  ) c
where
  b.messageid = c.mailmessageid
  • 実行結果
#  eventtype   mailmessageid   mailtimestamp   mailsource  mailsendingAccountId    mailsubject mailses_configurationset    mailses_source_ip   mailses_from_domain mailses_outgoing_ip bouncebounceType    bouncebouncesubtype bouncefeedbackid    bouncetimestamp bouncereportingmta  bounceaction    bouncediagnosticcode    bounceemailaddress
2   Bounce  0106018e1f0f23d1-60dd8a27-b3d5-47d3-9cd0-335ce124ebd0-000000    2024-03-08T17:14:29.713Z    test-bounce-0309@<domain>   000000000000    test-bounce ["default-1"]   ["162.120.155.65"]  ["<domain>"]        Permanent   General 0106018e1f0f2751-e58a1217-7740-40b5-ac80-fdc4b3ab6fdb-000000    2024-03-08T17:14:30.701Z    dns; e234-12.smtp-out.ap-northeast-1.amazonses.com  failed  smtp; 550 5.1.1 user unknown    [email protected]

費用

約1万通のメールをSESで送信したアカウント、Firehoseの費用で要したコストは0.016ドルでした。

Firehose費用

約1万通を送信したSESの利用費は、約1.1ドル。Firehose 導入によるコスト増加の影響は1%台でした。

SES費用

まとめ

Amazon SESから送信したメール、配送遅延や不着の調査原因を行う場合、SESのイベントログは重要な手がかりとなります。

特に不特定多数、外部メールアドレス宛に Amazon SESを利用して メールを一定以上の規模で送信する場合には、予期せぬ問題発生に備えて、今回紹介させて戴いた SESログ記録用の Firehose と、 Athenaなどの準備をお勧めします。

CloudFormationテンプレート

AWSTemplateFormatVersion:  2010-09-09
Description: 'S3 Bucket and Firehose with dynamic partition support (senders,EventType) that uses SES Config Set'
Resources:    
  s3BucketSESEvents:
    Type: AWS::S3::Bucket
    Properties:
      BucketEncryption:
        ServerSideEncryptionConfiguration:
          - ServerSideEncryptionByDefault:
              SSEAlgorithm: AES256
      BucketName: 
        !Join
          - ''
          - - 'aws-s3-ses-analytics-'
            - !Ref AWS::AccountId
            - '-'
            - !Ref AWS::Region
      LifecycleConfiguration:
        Rules:
          - Id: !Sub 'Delete-After-40-Days'
            ExpirationInDays: 40
            Status: Enabled

  S3SESEventsBucketPolicy:
    Type: AWS::S3::BucketPolicy
    Properties:
      Bucket: !Ref s3BucketSESEvents
      PolicyDocument:
        Version: 2012-10-17
        Statement:
          - 
            Action:
              - s3:GetObject
              - s3:ListBucket
              - s3:PutObject
            Effect: Allow
            Resource:
              - !Sub arn:aws:s3:::${s3BucketSESEvents}
              - !Sub arn:aws:s3:::${s3BucketSESEvents}/*
            Principal:
              AWS: !GetAtt ConfigSetPermissionPutFirehose.Arn

  firehoseSESRole:
    Type: AWS::IAM::Role
    Properties:
        AssumeRolePolicyDocument:
            Version: 2012-10-17
            Statement:
            -
                Effect: Allow
                Principal:
                    Service: firehose.amazonaws.com
                Action: 'sts:AssumeRole'
                Condition:
                    StringEquals:
                     'sts:ExternalId': !Ref 'AWS::AccountId'
        Policies:
          -
            PolicyName: !Join
                - ''
                - - 'aws-ses-analytics-kinesis-policy-'
                  - !Ref AWS::StackName
            PolicyDocument:
                Version: 2012-10-17
                Statement:
                - 
                    Effect: Allow
                    Action:
                        - 's3:AbortMultipartUpload'
                        - 's3:GetBucketLocation'
                        - 's3:GetObject'
                        - 's3:ListBucket'
                        - 's3:ListBucketMultipartUploads'
                        - 's3:PutObject'
                        - 'lambda:InvokeFunction' 
                        - 'lambda:GetFunctionConfiguration'  
                    Resource:
                    - !Join 
                        - ''
                        - - 'arn:aws:s3:::'
                          - !Ref s3BucketSESEvents
                    - !Join 
                        - ''
                        - - 'arn:aws:s3:::'
                          - !Ref s3BucketSESEvents
                          - '*'

  firehoseDeliveryStream: 
    Type: AWS::KinesisFirehose::DeliveryStream
    Properties:
      DeliveryStreamEncryptionConfigurationInput:
        KeyType: AWS_OWNED_CMK 
      DeliveryStreamName: 
        !Join
          - ''
          - - 'aws-ses-analytics-kinesis-'
            - !Ref AWS::AccountId
      ExtendedS3DestinationConfiguration:
        BucketARN: !Join 
          - ''
          - - 'arn:aws:s3:::'
            - !Ref s3BucketSESEvents
        BufferingHints:
          IntervalInSeconds: 900
          SizeInMBs: 128
        CloudWatchLoggingOptions:
          Enabled: true
          LogGroupName: !Ref 'FirehoseCWLogGroup'
          LogStreamName: S3Delivery
        CompressionFormat: GZIP
        ErrorOutputPrefix: '!{firehose:error-output-type}/'
        Prefix: "partitioned/!{partitionKeyFromQuery:SenderIdentity}/!{partitionKeyFromQuery:EventType}/!{timestamp:yyyy/MM/dd}/"
        RoleARN: !GetAtt firehoseSESRole.Arn
        ProcessingConfiguration:
          Enabled: true
          Processors:
            - Type: MetadataExtraction
              Parameters:
                - ParameterName: MetadataExtractionQuery
                  ParameterValue: '{SenderIdentity: .mail.tags."ses:sender-identity"[0], EventType:.eventType}'
                - ParameterName: JsonParsingEngine
                  ParameterValue: JQ-1.6
        DynamicPartitioningConfiguration:
          Enabled: true
          RetryOptions:
            DurationInSeconds: 300

  ConfigSetPermissionPutFirehose:
    Type: AWS::IAM::Role
    Properties: 
        AssumeRolePolicyDocument: 
          Version: "2012-10-17"
          Statement: 
          - 
            Effect: "Allow"
            Principal: 
                Service: ses.amazonaws.com
            Action: sts:AssumeRole
        Path: '/service-role/'
        Policies: 
          - 
            PolicyName: !Join
                - ''
                - - 'AWS-ses-config-set-destination-'
                  - !Ref AWS::StackName
            PolicyDocument:
                Version: '2012-10-17'
                Statement:
                    - 
                      Effect: Allow
                      Action:
                          - firehose:PutRecord
                          - firehose:PutRecordBatch
                      Resource: 
                        - !Join 
                          - ''
                          - - 'arn:aws:firehose:'
                            - !Ref AWS::Region
                            - ':'
                            - !Ref AWS::AccountId
                            - ':deliverystream/'
                            - !Join
                              - ''
                              - - 'aws-ses-analytics-kinesis-'
                                - !Ref AWS::AccountId
                    - 
                      Effect: Allow
                      Action:
                          - logs:PutLogEvents
                      Resource: !Sub '${FirehoseCWLogGroup.Arn}'

  FirehoseCWLogGroup:
    Type: AWS::Logs::LogGroup
    Properties:
      LogGroupName: !Sub '/aws/firehose/aws-ses-analytics-kinesis-${AWS::AccountId}'
      RetentionInDays: 7
  FirehoseCWLogStream:
    Type: AWS::Logs::LogStream
    Properties:
      LogGroupName: !Ref 'FirehoseCWLogGroup'
      LogStreamName: S3Delivery

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.